package elastic.onestep.index;


import com.alibaba.fastjson.JSONObject;
import elastic.onestep.config.Global;
import elastic.onestep.factory.ClientFactory;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.*;
import java.util.concurrent.TimeUnit;

/**
 * unclewang
 * 2018/5/4 15:57
 */
public class EsAuthorIndexer {
    private Settings.Builder settings = Settings.builder()
            .put("index.number_of_shards", Global.getEsNumOfShards())
            .put("index.number_of_replicas", Global.getEsNumOfReplicas());

    protected static Client client = ClientFactory.get();
    protected String indice;
    protected String type;
    protected String filepath;

    public EsAuthorIndexer() {
    }

    public EsAuthorIndexer(String indice, String type, String filepath) {
        this.indice = indice;
        this.type = type;
        this.filepath = filepath;
    }

    /**
     * 删除索引
     */
    private boolean deleteIndex() {
        return client.admin().indices().prepareDelete(indice).get().isAcknowledged();
    }

    /**
     * 判断指定的索引名是否存在
     */
    private boolean isExistsIndex() {
        IndicesExistsResponse response = client.admin().indices().
                exists(new IndicesExistsRequest().indices(indice)).actionGet();
        return response.isExists();
    }

    /**
     * 判断type是否存在
     */
    private boolean isExistsType() {
        TypesExistsResponse typesExistsResponse = client.admin().indices()
                .typesExists(new TypesExistsRequest(new String[]{indice}, type)).actionGet();
        return typesExistsResponse.isExists();
    }

    /**
     * 创建索引
     */
    private boolean createIndex() {
        CreateIndexResponse cir = client.admin().indices().prepareCreate(indice).setSettings(settings).get();
        return cir.isAcknowledged();
    }

    /**
     * 检查索引
     */
    private void checkIndex() {
        if (!isExistsIndex()) {
            createIndex();
        }
        if (!isExistsType()) {
            //map();
        }
    }

    public void map() {
        PutMappingResponse pmr = null;
        try {
            XContentBuilder mapping = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject("properties")
                    .startObject("id").field("type", "keyword").field("store", true).endObject()
                    .startObject("author").field("type", "keyword").field("store", true).endObject()
                    .startObject("org").field("type", "keyword").field("store", true).endObject()
                    .endObject()
                    .endObject();
            System.out.println(mapping);
            pmr = client.admin().indices().preparePutMapping(indice)
                    .setType(type)
                    .setSource(mapping).get();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("@@put mapping@@, ack:" + pmr.isAcknowledged());
    }

    protected UpdateRequest upsert(String id, String source) {
        return new UpdateRequest(indice, type, id).doc(source, XContentType.JSON).upsert(source, XContentType.JSON);
    }

    /**
     * 获取 BulkProcessor
     */
    protected static BulkProcessor getBulkProcessor() {
        BulkProcessor bp = BulkProcessor.builder(
                client, new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long l, BulkRequest bulkRequest) {
                        System.out.println(bulkRequest.numberOfActions());
                    }

                    @Override
                    public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                        System.out.println("@@bulk success@@, execId:" + l + ", bulk size:" + bulkResponse.getItems().length);
                    }

                    @Override
                    public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                        System.out.println("@@bulk failure@@, execId:" + l + ", failure:" + throwable.getMessage());
                    }
                }
        )
                //
                .setBulkActions(Global.getEsBulkActions())
                .setBulkSize(new ByteSizeValue(Global.getEsBulkSize(), ByteSizeUnit.MB))
                .setConcurrentRequests(1)
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
        return bp;
    }

    private void processFile(File file, BulkProcessor bulkProcessor) {
        try {
            if (file.isFile()) {
                // 需要处理的是单个文件
                System.out.println("@@indexing@@,[" + file.getCanonicalPath() + "]");
                BufferedReader reader = new BufferedReader(new FileReader(file));
                String record = null;
                while ((record = reader.readLine()) != null) {
                    JSONObject json = JSONObject.parseObject(record);
                    bulkProcessor.add(upsert(json.getString("id"), record));
                }
            } else {
                for (File f : file.listFiles()) {
                    processFile(f, bulkProcessor);
                }
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    private void bulkIndex() {
        BulkProcessor bulkProcessor = getBulkProcessor();
        File file = new File(filepath);
        if (file.exists()) {
            processFile(file, bulkProcessor);
            bulkProcessor.flush();
            try {
                bulkProcessor.awaitClose(5, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println(filepath + " doesn't exist! ");
        }
    }

    public void index(){
        System.out.println("开始创建索引");
        checkIndex();
        bulkIndex();

    }
    public static void main(String[] args) {

        EsAuthorIndexer eai = new EsAuthorIndexer("test", "author", "/Users/unclewang/Idea_Projects/expert/author2json.json");
        eai.index();

    }
}
